-
Notifications
You must be signed in to change notification settings - Fork 188
relay: add pullBinlogs interface for dm-worker to get local/grpc relay log #2216
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/cc @D3Hunter |
@lichunzhu: GitHub didn't allow me to request PR reviews from the following users: D3Hunter. Note that only pingcap members and repo collaborators can review this PR, and authors cannot review their own PRs. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review later
@@ -940,3 +940,42 @@ func (s *Server) GetWorkerCfg(ctx context.Context, req *pb.GetWorkerCfgRequest) | |||
resp.Cfg, err = s.cfg.Toml() | |||
return resp, err | |||
} | |||
|
|||
// PullBinlogs will start a goroutine to continuously parse binlogs in relay dir and send them in streaming way. | |||
func (s *Server) PullBinlogs(req *pb.PullBinlogReq, stream pb.Worker_PullBinlogsServer) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this api's operating on SourceWorker layer
also we may have more than one SourceWorker later
maybe it (or part of the implementation of it) can move into SourceWorker
layer, not Server
layer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will only register one grpc server service in dm-worker Server
layer so I put this function here. How to register grpc server if we put this function to SourceWorker
layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or part of the implementation of it, then server calls it, to separate the api and the work(belongs to SourceWorker
i think)
|
||
message PullBinlogReq { | ||
// Specifies which source of binlog to pull. | ||
string source = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the value of source? source-name or server-id?
there's another layer of sequence in relay dir, can it handle that?
<deploy_dir>/relay_log/
|-- 7e427cc0-091c-11e9-9e45-72b7c59d52d7.000001
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Source
is source-name. UUID is the detail that relay to think about. For the dm-worker layer I think we'd better use source.- Current relay can handle this so I think it's okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
relay log filename maybe the same under different relay dir, for pos-based sync, it maybe ambiguous
<deploy_dir>/relay_log/
|-- 7e427cc0-091c-11e9-9e45-72b7c59d52d7.000001
--|_mysql-000001
|-- another-server-under-load-balancer.000001
--|_mysql-000001
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean. Relay replicated by binlog position shouldn't support switch MySQL leader/follower. So I think using Source
doesn't have a problem.
dm/worker/server.go
Outdated
return terror.ErrWorkerPullBinlogsInvalidRequest.Generate("worker doesn't enable relay") | ||
} | ||
|
||
ch, ech := s.worker.relayHolder.PullBinlogs(ctx, req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.worker
-> w
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest LGTM
ping @lance6716 @Ehco1996 @D3Hunter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -222,7 +223,8 @@ type Syncer struct { | |||
} | |||
|
|||
// NewSyncer creates a new Syncer. | |||
func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { | |||
func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, | |||
pullLocalBinlogs func(context.Context, *pb.PullBinlogReq) (chan *replication.BinlogEvent, chan error)) *Syncer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why syncer
needs to care about api PullBinlog
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because streamerController
in sycner need to use this to pull and control binlog 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, create reader is on relay
and syncer
need to access relay
's createReader
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even with that design, it would be better to abstract it, not passing a function(with full signature) around, it's hard to extend and read.
Location startFrom = 2; | ||
} | ||
|
||
message PullBinlogResp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to include a error field here? What if the source is not exist in the target worker?
@lichunzhu: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
merge this in next sprint |
BTW, the perfect interface in my mind is using MySQL replication protocal, so someday we can decouple "relay log" functionalities from DM worker, DM worker will transparently connect to relay log nodes or real upstream. cc @sunzhaoyang |
What problem does this PR solve?
Close #2214
What is changed and how it works?
Check List
Tests
Code changes
Related changes